#include "ibms_in.h"

signed char		g_exit_flag = 0 ;

int worker( struct IbmsEnv *p_env )
{
	struct epoll_event	event ;
	
	struct epoll_event	events[ MAX_EPOLL_EVENTS ] ;
	int			epoll_nfds ;
	int			i ;
	struct epoll_event	*p_event = NULL ;
	
	struct ListenSession	*p_listen_session = NULL ;
	struct PipeSession	*p_pipe_session = NULL ;
	struct AcceptedSession	*p_accepted_session = NULL ;
	
	int			nret = 0 ;
	
	IBPCleanLogEnv();
	IBPInitLogEnv( p_env->ibms_conf.ibms.log.iblog_server , "ibms.worker" , p_env->ibms_conf.ibms.log.event_output , IBPConvertLogLevel(p_env->ibms_conf.ibms.log.worker_loglevel) , "%s" , p_env->ibms_conf.ibms.log.worker_output );
	
	p_env->epoll_fd = epoll_create( 1024 ) ;
	if( p_env->epoll_fd == -1 )
	{
		ERRORLOGSG( "epoll_create failed , errno[%d]" , errno );
		return -1;
	}
	else
	{
		DEBUGLOGSG( "epoll_create ok , fd[%d]" , p_env->epoll_fd );
	}
	
	memset( & event , 0x00 , sizeof(struct epoll_event) );
	event.events = EPOLLIN | EPOLLERR ;
	event.data.ptr = & (p_env->listen_session) ;
	nret = epoll_ctl( p_env->epoll_fd , EPOLL_CTL_ADD , p_env->listen_session.netaddr.sock , & event ) ;
	if( nret == -1 )
	{
		ERRORLOGSG( "epoll_ctl[%d] add listen_session failed , errno[%d]" , p_env->epoll_fd , errno );
		close( p_env->epoll_fd );
		return -1;
	}
	else
	{
		INFOLOGSG( "epoll_ctl[%d] add listen_session[%d]" , p_env->epoll_fd , p_env->listen_session.netaddr.sock );
	}
	
	memset( & event , 0x00 , sizeof(struct epoll_event) );
	event.events = EPOLLIN | EPOLLERR ;
	event.data.ptr = & (p_env->pipe_session) ;
	nret = epoll_ctl( p_env->epoll_fd , EPOLL_CTL_ADD , p_env->pipe_session.pipe_fds[0] , & event ) ;
	if( nret == -1 )
	{
		ERRORLOGSG( "epoll_ctl[%d] add pipe_session[%d] failed , errno[%d]" , p_env->epoll_fd , p_env->pipe_session.pipe_fds[0] , errno );
		close( p_env->epoll_fd );
		return -1;
	}
	else
	{
		INFOLOGSG( "epoll_ctl[%d] add pipe_session[%d]" , p_env->epoll_fd , p_env->pipe_session.pipe_fds[0] );
	}
	
	while( ! g_exit_flag )
	{
		INFOLOGSG( "epoll_wait[%d] ..." , p_env->epoll_fd );
		memset( events , 0x00 , sizeof(events) );
		epoll_nfds = epoll_wait( p_env->epoll_fd , events , MAX_EPOLL_EVENTS , 1000 ) ;
		if( epoll_nfds == -1 )
		{
			if( errno == EINTR )
			{
				INFOLOGSG( "epoll_wait[%d] interrupted" , p_env->epoll_fd );
			}
			else
			{
				ERRORLOGSG( "epoll_wait[%d] failed , errno[%d]" , p_env->epoll_fd , ERRNO );
			}
			
			return -1;
		}
		else
		{
			INFOLOGSG( "epoll_wait[%d] return[%d]events" , p_env->epoll_fd , epoll_nfds );
		}
		
		for( i = 0 , p_event = events ; i < epoll_nfds ; i++ , p_event++ )
		{
			if( p_event->data.ptr == & (p_env->listen_session) )
			{
				p_listen_session = (struct ListenSession *)(p_event->data.ptr) ;
				
				if( p_event->events & EPOLLIN )
				{
					nret = OnAcceptingSocket( p_env , p_listen_session ) ;
					if( nret < 0 )
					{
						FATALLOGSG( "OnAcceptingSocket failed[%d]" , nret );
						return -1;
					}
					else if( nret > 0 )
					{
						INFOLOGSG( "OnAcceptingSocket return[%d]" , nret );
					}
					else
					{
						DEBUGLOGSG( "OnAcceptingSocket ok" );
					}
				}
				else if( ( p_event->events & EPOLLERR ) || ( p_event->events & EPOLLHUP ) )
				{
					FATALLOGSG( "listen session err or hup event[0x%X]" , p_event->events );
					return -1;
				}
				else
				{
					FATALLOGSG( "Unknow listen session event[0x%X]" , p_event->events );
					return -1;
				}
			}
			else if( p_event->data.ptr == & (p_env->pipe_session) )
			{
				p_pipe_session = (struct PipeSession *)(p_event->data.ptr) ;
				
				if( ( p_event->events & EPOLLIN ) )
				{
					char		ch ;
					
					char		*file_content = NULL ;
					int		file_len ;
					ibms_conf	ibms_conf ;
					
					nret = (int)read( p_env->pipe_session.pipe_fds[0] , & ch , 1 );
					if( nret == -1 )
					{
						ERRORLOGSG( "read pipe failed[%d] , errno[%d]" , nret , errno );
						goto _GOTO_CLOSE_PIPE;
					}
					else
					{
						INFOLOGSG( "read pipe ok , ch[%c]" , ch );
					}
					
					file_content = StrdupEntireFile( p_env->ibms_conf_pathfilename , & file_len ) ;
					if( file_content == NULL )
					{
						ERRORLOGSG( "StrdupEntireFile[%s] failed" , p_env->ibms_conf_pathfilename );
						continue;
					}
					
					memset( & ibms_conf , 0x00 , sizeof(ibms_conf) );
					nret = DSCDESERIALIZE_JSON_ibms_conf( "GB18030" , file_content , & file_len , & ibms_conf ) ;
					free( file_content );
					if( nret )
					{
						ERRORLOGSG( "DSCDESERIALIZE_JSON_ibms_conf[%s] failed[%d]" , p_env->ibms_conf_pathfilename , nret );
						continue;
					}
					
					strcpy( p_env->ibms_conf.ibms.log.iblog_server , ibms_conf.ibms.log.iblog_server );
					strcpy( p_env->ibms_conf.ibms.log.event_output , ibms_conf.ibms.log.event_output );
					strcpy( p_env->ibms_conf.ibms.log.worker_loglevel , ibms_conf.ibms.log.worker_loglevel );
					strcpy( p_env->ibms_conf.ibms.log.worker_output , ibms_conf.ibms.log.worker_output );
					
					INFOLOGSG( "InitLogEnv iblog_server[%s] event_output[%s] worker_output[%s][%s]" , p_env->ibms_conf.ibms.log.iblog_server , p_env->ibms_conf.ibms.log.event_output , p_env->ibms_conf.ibms.log.worker_loglevel , p_env->ibms_conf.ibms.log.worker_output );
					IBPCleanLogEnv();
					IBPInitLogEnv( p_env->ibms_conf.ibms.log.iblog_server , "ibms.worker" , p_env->ibms_conf.ibms.log.event_output , IBPConvertLogLevel(ibms_conf.ibms.log.worker_loglevel) , "%s" , p_env->ibms_conf.ibms.log.worker_output );
					INFOLOGSG( "InitLogEnv iblog_server[%s] event_output[%s] worker_output[%s][%s]" , p_env->ibms_conf.ibms.log.iblog_server , p_env->ibms_conf.ibms.log.event_output , p_env->ibms_conf.ibms.log.worker_loglevel , p_env->ibms_conf.ibms.log.worker_output );
				}
				else if( ( p_event->events & EPOLLERR ) || ( p_event->events & EPOLLHUP ) )
				{
_GOTO_CLOSE_PIPE :
					epoll_ctl( p_env->epoll_fd , EPOLL_CTL_DEL , p_env->pipe_session.pipe_fds[0] , NULL );
					close( p_env->pipe_session.pipe_fds[0] );
					
					g_exit_flag = 1 ;
				}
				else
				{
					FATALLOGSG( "Unknow pipe session event[0x%X]" , p_event->events );
					return -1;
				}
			}
			else
			{
				p_accepted_session = (struct AcceptedSession *)(p_event->data.ptr) ;
				
				if( p_event->events & EPOLLIN )
				{
					nret = OnReceivingSocket( p_env , p_accepted_session ) ;
					if( nret < 0 )
					{
						FATALLOGSG( "OnReceivingSocket failed[%d]" , nret );
						return -1;
					}
					else if( nret > 0 )
					{
						INFOLOGSG( "OnReceivingSocket return[%d]" , nret );
						OnClosingSocket( p_env , p_accepted_session );
					}
					else
					{
						DEBUGLOGSG( "OnReceivingSocket ok" );
					}
				}
				else if( p_event->events & EPOLLOUT )
				{
					nret = OnSendingSocket( p_env , p_accepted_session ) ;
					if( nret < 0 )
					{
						FATALLOGSG( "OnSendingSocket failed[%d]" , nret );
						return -1;
					}
					else if( nret > 0 )
					{
						INFOLOGSG( "OnSendingSocket return[%d]" , nret );
						OnClosingSocket( p_env , p_accepted_session );
					}
					else
					{
						DEBUGLOGSG( "OnSendingSocket ok" );
					}
				}
				else if( ( p_event->events & EPOLLERR ) || ( p_event->events & EPOLLHUP ) )
				{
					FATALLOGSG( "accepted session err or hup event[0x%X]" , p_event->events );
					OnClosingSocket( p_env , p_accepted_session );
				}
				else
				{
					FATALLOGSG( "Unknow accepted session event[0x%X]" , p_event->events );
					return -1;
				}
			}
		}
	}
	
	INFOLOGSG( "close listen[%d]" , p_env->listen_session.netaddr.sock );
	close( p_env->listen_session.netaddr.sock );
	
	INFOLOGSG( "close epoll_fd[%d]" , p_env->epoll_fd );
	close( p_env->epoll_fd );
	
	return 0;
}

